In this part of the tutorial, we will see how to query RDF Streams using an RDF Stream Processing (RSP) engine. The CSPARQL engine is an RSP engine that does not offer reasoning capabilities. Although extensions have been proposed, in this part of the tutorial we focus on plain continuous query answering.
RSPLib is the python library that we are going to use in our tutorial. It offers abstrations to manipulate RSPSources, e.g. RDF Streams and to interact with RSP Engines, e.g. registering streams and queries.
RSPLib APIs are still under development. In this tutorial we are going to use version 0.3.4 which is available at https://pypi.python.org/pypi/rsplib/.
to upgrade it just type !pip instal rsplib --upgrade in a cell
source code available at https://github.com/streamreasoning/rsplib
In [4]:
from IPython.display import Image
from IPython.core.display import HTML
from rsplib.processing import RSPSource, RSPEngine
TripleWave is a resource designed to publish RDF Stream on the web. RSPLab integrates TripleWave as a default tool for stream provisioning.
Did you already started the streams in the streamer/citybench
folder?
Notably, since we are deal with replayed RDF streams, we need to start them using a specific API. Wild Streams, i.e. those naturally available on the web like WikiPedia changes), do not require to be started.
In [33]:
stream0 = RSPSource("http://aarhustrafficdata182955", 4000);
stream0.sgraph()
Out[33]:
In [34]:
stream1 = RSPSource("http://aarhustrafficdata158505", 4001);
stream1.sgraph()
Out[34]:
In [5]:
stream0.start()
Out[5]:
In [ ]:
stream1.start()
Now that we started our streams, we can consider them as wild RDF streams that we can consume and query using an RSP Engine.
RDFLib abstracts RDF Streams a RSPSource. Using this Facace is possible to access the stream description published using TripleWave: the sGragh
Are you curious to see what's is inside the RDF Streams?
You might have noticed that the sGraph has a sld:contains property that contains the uris of the stream items, called iGraphs.
However, in order to access the content of the iGraphs one has to de-reference their URIs.
Click on the following link and let's see how to consume the content in a push-based manner as RSP engine do.
Now that we have our RDF Stream running, we can start thinking about how to query them. To this extent, we are going to use the CSPARQL engine.
Did you started csparql in the consumer folder?
RSPLib offers a facade to communicate with RSP engine via the RSP Services (a RESTful interface for RSPs).
In [27]:
csparql = RSPEngine("http://csparql", 8182);
csparql.status()
Out[27]:
A first important step to answer queries over streams is the registration. As we saw during the initial part of this tutorial, Streams are unbounded sequences of data. Therefore, in order to access them we need special systems such as RSP engines.
The RSP engines needs receive the data in a push-based manned. This process is initialized registering the streams to the engine.
In [28]:
csparql.register_stream("AarhusTrafficData182955", "http://aarhustrafficdata182955:4000/sgraph")
Out[28]:
In [29]:
csparql.register_stream("AarhusTrafficData158505", "http://aarhustrafficdata158505:4001/sgraph")
Out[29]:
In [23]:
to access the stream andcsparql.streams()
Out[23]:
Now that our engine receives the data from the registered streams, we can issue a query to answer.
As we saw in the second part of this tutorial, queries over streams are different from queries over static data: they are continuous queries
Continuous query answering produces results over time, therefore a query is registered and evaluated again and again.
In the current example, we are using the CSPARQL engine to query our RDF Streams. This particular engine consumes queries written in a continuous extension of SPARQL called CSPARQL. An example of query looks like this:
In [30]:
with open('q1.csparql.txt', 'r') as csparq_query:
body = csparq_query.read()
print(body)
In order to register our query, we have to give it a name, e.g. givemeall and decide its type. The query type can be either STREAM or QUERY. The former produces a stream as a result, i.e. each output is timestamped, the latter simple push out the results.
This simple difference is actually very important, since an output STREAM can be consumed by another RSP engine while a QUERY does not.
In [31]:
csparql.register_query("givemeall", "STREAM", body)
Out[31]:
In [32]:
csparql.register_observer("givemeall", "default", {"host":"csparql","type":"ws","port":8283,"name":"default"})
Out[32]:
Are you courious how the output stream looks like?
RSPLab offers assisted realt-time performance monitoring using cAdvisor and Grafana. In order to observe the current status of the engine you can visit
http://localhost:3000/dashboard/db/csparql?orgId=1
in case you didn't access yet, username:admin password:admin
In [6]:
Image(url= "https://raw.githubusercontent.com/streamreasoning/rsplab/tutorial/collector/lab/streamapp/images/jasper_grafana.png")
Out[6]:
In [35]:
csparql.unregister_observer("givemeall", "default")
Out[35]:
In [36]:
csparql.unregister_query("givemeall")
Out[36]:
In [37]:
csparql.unregister_stream("AarhusTrafficData158505")
Out[37]:
In [38]:
csparql.unregister_stream("AarhusTrafficData182955")
Out[38]:
In [40]:
csparql.status()
Out[40]:
In [ ]: